Conversation
davidsbatista
left a comment
There was a problem hiding this comment.
@hande-k thank you for this contribution!
I left some initial comments/suggestions for improvements.
|
Thanks for the review @davidsbatista & @sjrl! I've addressed all the comments. A couple of notes:
Let me know if anything needs further adjustment! |
@hande-k thanks for your patience with us! This is going to be a great addition to Haystack. Since it’s the first version of a new abstraction, we really appreciate you working through all the comments with us as we refine how it fits in. |
|
Hi @hande-k, just checking in, do you have time to continue working on this? Happy to help or take over some of the remaining changes if that’s useful. |
|
hi @sjrl thanks for the new comments and offering your help to handle the requested changes, appreciate it! |
|
@hande-k you're welcome and that sounds good to me! |
| def __init__( | ||
| self, search_type: CogneeSearchType = "GRAPH_COMPLETION", top_k: int = 10, dataset_name: str | None = None | ||
| ): |
There was a problem hiding this comment.
Following the pattern for our DocumentStores and their respective retrievers I'd expect the init method to take in a CogneeMemoryStore as init param. Check out our OpenSearchBM25Retriever as an example
The idea is for the retriever to call the method directly from the Store. E.g. this is how the bm25 retrieval is run in the linked component docs = doc_store._bm25_retrieval(**bm25_args).
So it would be great if we could follow that pattern here as well.
There was a problem hiding this comment.
Refactored — CogneeRetriever.__init__ now takes a CogneeMemoryStore instance (following the OpenSearchBM25Retriever pattern). The retriever delegates search to store.search_memories() and handles to_dict/from_dict serialization of the nested store. Tests, examples, and integration tests all updated.
|
Review the following changes in direct dependencies. Learn more about Socket for GitHub.
|
|
Hi @sjrl sorry for the delay! it's been an intense few weeks but I'm back on this. I've addressed all the open review comments above. The one remaining issue is the Python 3.14 on Windows CI failure. It's caused by kuzu (a dependency via cognee, the embedded graph DB i menioned above). Linux and macOS pass fine. Would it be okay to exclude the Windows + 3.14 combination from the CI matrix for now? We'd still have 3.14 coverage on Linux and macOS, and Windows coverage via 3.10. Let me know what you think. |
No problem! Thanks for getting back to this. I should have some time to review this in the next few days. But to answer your question yes it would be fine to remove the windows + 3.14 combo for now, let's just leave a comment there explaining the problem. |
| Converts the given UUID string to a cognee User via ``cognee.modules.users.methods.get_user``. | ||
|
|
||
| :param user_id: UUID string identifying the cognee user. | ||
| :returns: A cognee ``User`` object. |
There was a problem hiding this comment.
| :returns: A cognee ``User`` object. | |
| :returns: A cognee `User` object. |
| """ | ||
| Resolve a user_id string to a cognee User object. | ||
|
|
||
| Converts the given UUID string to a cognee User via ``cognee.modules.users.methods.get_user``. |
There was a problem hiding this comment.
| Converts the given UUID string to a cognee User via ``cognee.modules.users.methods.get_user``. | |
| Converts the given UUID string to a cognee User via `cognee.modules.users.methods.get_user`. |
| pull_request: | ||
| paths: | ||
| - "integrations/cognee/**" | ||
| - "!integrations/cognee/*.md" | ||
| - ".github/workflows/cognee.yml" |
There was a problem hiding this comment.
The push tag looks to be missing
| pull_request: | |
| paths: | |
| - "integrations/cognee/**" | |
| - "!integrations/cognee/*.md" | |
| - ".github/workflows/cognee.yml" | |
| pull_request: | |
| paths: | |
| - "integrations/cognee/**" | |
| - "!integrations/cognee/*.md" | |
| - ".github/workflows/cognee.yml" | |
| push: | |
| branches: | |
| - main | |
| paths: | |
| - "integrations/cognee/**" | |
| - "!integrations/cognee/*.md" | |
| - ".github/workflows/cognee.yml" |
|
|
||
| env: | ||
| PYTHONUNBUFFERED: "1" | ||
| FORCE_COLOR: "1" |
There was a problem hiding this comment.
Let's go ahead and add the LLM api key here that we will need e.g. LLM_API_KEY: ${{ secrets.OPENAI_API_KEY }}.
Which provider's api key would be appropriate to use?
| - name: Run tests | ||
| run: hatch run test:cov-retry |
There was a problem hiding this comment.
We like to separate unit and integration test runs
| - name: Run tests | |
| run: hatch run test:cov-retry | |
| - name: Run unit tests | |
| run: hatch run test:unit-cov-retry |
|
|
||
| - name: Run tests | ||
| run: hatch run test:cov-retry | ||
|
|
There was a problem hiding this comment.
Let's add the integration tests as well
| - name: Run integration tests | |
| run: hatch run test:integration-cov-append-retry |
| unit = 'pytest -m "not integration" {args:tests}' | ||
| integration = 'pytest -m "integration" {args:tests}' | ||
| all = 'pytest {args:tests}' | ||
| cov-retry = 'pytest --cov=haystack_integrations --reruns 3 --reruns-delay 30 -x {args:tests}' |
There was a problem hiding this comment.
Apologies we've made some changes to the default pyproject.toml since this PR was opened.
| cov-retry = 'pytest --cov=haystack_integrations --reruns 3 --reruns-delay 30 -x {args:tests}' | |
| unit-cov-retry = 'pytest --cov=haystack_integrations --reruns 3 --reruns-delay 30 -x -m "not integration" {args:tests}' | |
| integration-cov-append-retry = 'pytest --cov=haystack_integrations --cov-append --reruns 3 --reruns-delay 30 -x -m "integration" {args:tests}' |
There was a problem hiding this comment.
Could we rename this to demo_memory_store.py since we aren't actually using an Agent component in this demo?
| self.auto_cognify = auto_cognify | ||
|
|
||
| @component.output_types(documents_written=int) | ||
| def run(self, documents: list[Document], user_id: str | None = None) -> dict[str, Any]: |
There was a problem hiding this comment.
| def run(self, documents: list[Document], user_id: str | None = None) -> dict[str, Any]: | |
| def run(self, documents: list[Document], user_id: str | None = None) -> dict[str, int]: |
| effective_top_k = top_k or self.top_k | ||
|
|
||
| try: | ||
| raw_results = run_sync( | ||
| cognee.search( | ||
| query_text=query, | ||
| query_type=search_type_enum, | ||
| user=user, | ||
| datasets=[self.dataset_name], |
There was a problem hiding this comment.
It seems we should pass the top_k directly to cognee.search which accepts a top_k param
|
Hey @hande-k I wanted to provide you with an example script of how I imagined the cognee integration to work with Haystack. The main use case for us is to enable memories for our Agent component so I think the Retriever and Writer components need to be reworked to return and accept list of ChatMessages respectively. Here is a demo script I tried out that assumes these changes to the components have already been made. Feel free to include this in the examples folder if you'd like. #!/usr/bin/env python
"""
Demo: Memory-Augmented Agent with CogneeRetriever and CogneeWriter
Shows the core memory-agent loop where Cognee enriches every conversation
turn with memories from past sessions alongside the live chat history:
1. Pre-seeded memories: facts from previous sessions are loaded into Cognee
before the conversation starts, simulating persistent long-term memory.
2. Before each turn: CogneeRetriever fetches relevant memories and the
OutputAdapter prepends them to the live chat history + current user message.
3. The Agent processes the full context (memories + history + user message).
4. After each turn: CogneeWriter stores the agent's output messages so future
sessions can recall what happened in this one.
Pipeline structure (per turn):
query ──► CogneeRetriever ──► memories (list[ChatMessage]) ──┐
├──► OutputAdapter ──► agent.messages
history + user_message (pipeline inputs) ─────────────────────┘
│
Agent ──► messages, last_message
│
CogneeWriter ──► documents_written
NOTE: This demo assumes CogneeRetriever and CogneeWriter have been updated to
work with ChatMessages rather than Documents. See the review comments for the
required changes.
Prerequisites:
pip install -e "integrations/cognee"
Set your LLM and vector DB keys (Cognee uses them internally):
export OPENAI_API_KEY="sk-..."
"""
import logging
import os
# Must be set before importing cognee — its setup_logging() reads LOG_LEVEL at
# import time and installs structlog handlers that bypass standard logging config.
os.environ.setdefault("LOG_LEVEL", "WARNING")
import asyncio
import cognee
from haystack import Pipeline
logging.basicConfig(level=logging.WARNING)
from haystack.components.agents import Agent
from haystack.components.converters import OutputAdapter
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.dataclasses import ChatMessage
from haystack_integrations.components.retrievers.cognee import CogneeRetriever
from haystack_integrations.components.writers.cognee import CogneeWriter
from haystack_integrations.memory_stores.cognee import CogneeMemoryStore
DATASET_NAME = "agent_memory_demo"
# Facts that would have been stored in a previous session.
SEEDED_MEMORIES = [
"My name is Alice. I'm a senior data scientist at Acme Corp specialising in NLP and knowledge graphs.",
"My current project is building an internal documentation search system powered by Haystack and Cognee.",
"My team: Bob is the ML engineer and Carol handles infrastructure.",
"I prefer concise answers with Python code examples over long prose explanations.",
]
SYSTEM_PROMPT = (
"You are a helpful assistant with access to a persistent memory of past conversations. "
"Any system messages at the start of the conversation contain relevant memories retrieved "
"from previous interactions — use them to personalise your responses and maintain continuity "
"across turns. "
"Be concise. Prefer short answers and Python code examples over long prose unless the user "
"asks for more detail."
)
def build_memory_agent_pipeline(store: CogneeMemoryStore) -> Pipeline:
"""
Wire the memory-augmented agent pipeline.
The OutputAdapter merges three inputs into one flat ChatMessage list for
the Agent:
- memories: retrieved from Cognee by CogneeRetriever
- history: accumulated chat messages from previous turns
- user_messages: the current user message
unsafe=True is required so that the Jinja NativeEnvironment returns the
concatenated list as a real Python object rather than its string representation.
"""
pipeline = Pipeline()
pipeline.add_component("retriever", CogneeRetriever(memory_store=store))
pipeline.add_component(
"injector",
OutputAdapter(
template="{{ memories + history + user_messages }}",
output_type=list[ChatMessage],
unsafe=True,
),
)
pipeline.add_component(
"agent",
Agent(
chat_generator=OpenAIChatGenerator(model="gpt-4o-mini"),
system_prompt=SYSTEM_PROMPT,
),
)
pipeline.add_component("writer", CogneeWriter(dataset_name=store.dataset_name))
pipeline.connect("retriever.messages", "injector.memories")
pipeline.connect("injector.output", "agent.messages")
pipeline.connect("agent.messages", "writer.messages")
return pipeline
def run_turn(
pipeline: Pipeline,
user_text: str,
history: list[ChatMessage],
) -> str:
"""
Run one conversation turn, update history in-place, and return the agent reply.
History is maintained outside the pipeline so it contains only the clean
user/assistant exchange — not the memory context messages, which are
injected fresh on every turn.
"""
result = pipeline.run(
{
"retriever": {"query": user_text},
"injector": {
"history": history,
"user_messages": [ChatMessage.from_user(user_text)],
},
}
)
last_message = result["agent"]["last_message"]
reply = last_message.text or "(no reply)"
# Append this turn to history so the next turn has full conversational context.
history.append(ChatMessage.from_user(user_text))
history.append(last_message)
return reply
async def main():
print("=== Cognee Memory Agent Pipeline Demo ===\n")
print("Pruning previous data for a clean start...")
await cognee.prune.prune_data()
await cognee.prune.prune_system(metadata=True)
print("Done.\n")
# -------------------------------------------------------------------------
# Pre-seed long-term memories from a simulated previous session.
# In a real application these would already be in Cognee from earlier runs.
# -------------------------------------------------------------------------
print("Seeding memories from a previous session...")
store = CogneeMemoryStore(search_type="GRAPH_COMPLETION", dataset_name=DATASET_NAME)
store.add_memories(messages=[ChatMessage.from_user(fact) for fact in SEEDED_MEMORIES])
print(f" {len(SEEDED_MEMORIES)} facts stored.\n")
pipeline = build_memory_agent_pipeline(store)
history: list[ChatMessage] = []
# -------------------------------------------------------------------------
# Multi-turn conversation. The Agent receives retrieved memories + live
# history + the current message on every turn, so it can both recall past
# facts and follow the thread of the current exchange.
# -------------------------------------------------------------------------
turns = [
# Turn 1 — recall a specific memory (no history yet)
"Hi! Can you remind me what project I'm currently working on?",
# Turn 2 — follow-up that requires turn 1 to make sense
"What's the tech stack we're using for it?",
# Turn 3 — recall a different memory fact
"Who else is on my team, and what are their roles?",
# Turn 4 — requires both a recalled preference *and* conversation context
"Based on what you know about me, give me a quick tip for structuring a new Haystack pipeline component.",
]
for user_text in turns:
print(f"User: {user_text}")
reply = run_turn(pipeline, user_text, history)
print(f"Agent: {reply}\n")
print("=== Done ===")
if __name__ == "__main__":
asyncio.run(main()) |
Closes https://github.com/deepset-ai/haystack-private/issues/240
Summary
CogneeWriter,CogneeCognifier,CogneeRetriever, andCogneeMemoryStoreCogneeWriteringests Haystack Documents into Cognee memory viacognee.add()+ optionalcognee.cognify()CogneeRetrieversearches Cognee's memory and returns Haystack DocumentsCogneeCognifierwrapscognee.cognify()as a standalone pipeline stepCogneeMemoryStoreimplements theMemoryStoreprotocol from haystack-experimental for use with Haystack's experimental AgentTest plan
hatch run test:unithatch run fmt-checkhatch run test:typesdemo_pipeline.py,demo_memory_agent.py)